psycopg2によるPython2.7からのAmazon Redshiftアクセスサンプル

psycopg2によるPython2.7からのAmazon Redshiftアクセスサンプル

Clock Icon2015.12.31

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

2015年も本日で最終日となりました。読者の皆様におかれましては、しっかり仕事を納めつつ年末(及び年始)を過ごされていますでしょうか。そんな私も年末は仕事を無事納める事が出来、年末を落ち着いて過ごす傍らで社内で個人的に取り組んでいた可視化事案の仕組みを全面的にPythonで置き換えるという作業をちょいちょい進めておりました。

可視化に関する作業の中にはAmazon Redshiftへのアクセスを行なう部分も含まれていますが、この部分についてもPythonで利用出来るライブラリを使って処理を実現する事が可能です。そこで当エントリではPythonからライブラリ『psycopg2』を使ってAmazon Redshiftへのアクセスを行なう際のサンプルコードをまとめてみたいと思います。(psycopg2はPythonで最もポピュラーなPostgreSQLライブラリです。Amaozn RedshiftはPostgreSQLに準拠している為、同じ手法で活用する事が出来ました)

目次

 

環境準備

 

Amazon Linuxの場合

Amazon Linuxで環境を整える場合は以下コマンドで必要なライブラリの導入を行います。

$ sudo yum install -y gcc python27 python27-devel postgresql-devel
$ sudo pip install psycopg2

 

Mac OS Xの場合

次はMacについて。手元の環境では以下のコマンドで整いました。

$ sudo easy_install psycopg2

環境によっては、もしかしたら以下の設定も必要となるかも?

$ sudo apt-get install python-dev libpq-dev
$ sudo apt-get install python-setuptools
$ sudo easy_install psycopg2

 

用途別サンプルコード

ここからは、処理に応じたサンプルコードの紹介をして行きたいと思います。

 

CRUD・DDL

まずは基本的なデータベースアクセスから。ドキュメント等を参考に、以下の形でそれぞれSELECT/INSERT/UPDATE/DELETE、CREATE TABLE/DROP TABLEを実行しています。

CREATE TABLE文を記述する際の改行含みの文字列設定については以下の情報を参考にしました。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import psycopg2

hostName = "XXXXXXXXXXXXXXXXXXXX"
databaseName = "XXXXXXXXX"
portNo = "5439"
userName = "XXXXXXXX"
password = "XXXXXXXXXXXXX"

conn = psycopg2.connect(
  host=hostName,
  database=databaseName,
  port=portNo,
  user=userName,
  password=password
);
print("psycopg2 connected.");


cursor = conn.cursor();
print("psycopg2 cursor opened.");

print "CREATE TABLE start.";
cursor.execute("DROP TABLE IF EXISTS public.table_from_python");
create_table_ddl = """
CREATE TABLE public.table_from_python ( 
  pyid INT NOT NULL encode lzo,
  pydate DATE NOT NULL encode delta, 
  pyname VARCHAR(30) NOT NULL encode lzo, 
  PRIMARY KEY(pyid))
  DISTSTYLE EVEN
  SORTKEY(pydate)
""";

cursor.execute(create_table_ddl);
print "CREATE TABLE end.";

print "INSERT start.";
cursor.execute("INSERT INTO public.table_from_python VALUES(1,'2015/12/25','AAあああ')");
cursor.execute("INSERT INTO public.table_from_python VALUES(2,'2015/12/26','BBいいい')");
cursor.execute("INSERT INTO public.table_from_python VALUES(3,'2015/12/27','CCううう')");
print "INSERT end.";

print "UPDATE start.";
cursor.execute("UPDATE public.table_from_python SET pyname = 'DDえええ' WHERE pyid = 3");
print "UPDATE end.";

print "DELETE start.";
cursor.execute("DELETE FROM public.table_from_python WHERE pyid = 2");
print "DELETE end.";

print "TRUNCATE start."
cursor.execute("TRUNCATE public.table_from_python");
print "TRUNCATE end."

print "DROP start.";
cursor.execute("DROP TABLE public.table_from_python");
print "DROP end.";

cursor.close();
print("psycopg2 cursor closed.");
conn.close();
print("psycopg2 connection closed.");

 

SELECT

SELECT文で結果を操作する際のサンプルコード。この辺り他のライブラリ等でもう少しスマートな方法で書けそうな気もしますが、ひとまずこのレベルで使えれば良いかなという事で。数値型や日付型のデータについては文字列として出力する場合、文字列型に何らかの形で変換する必要があります。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import psycopg2

hostName = "XXXXXXXXXXXXXXXXXXXX"
databaseName = "XXXXXXXXX"
portNo = "5439"
userName = "XXXXXXXX"
password = "XXXXXXXXXXXXX"

conn = psycopg2.connect(
  host=hostName,
  database=databaseName,
  port=portNo,
  user=userName,
  password=password
);
print("psycopg2 connected.");


cursor = conn.cursor();
print("psycopg2 cursor opened.");


cursor.execute("SELECT COUNT(*) FROM public.orders");
print cursor.fetchone();

cursor.execute("SELECT * FROM public.orders ORDER BY order_date ASC, order_id ASC LIMIT 5");
results = cursor.fetchall()
for row in results:
  print "----"
  order_id = row[0];
  order_date = row[1];
  priority = row[2];
  ship_mode = row[6];
  customer_name = row[11];
  product_name = row[20];
  print "オーダーID:" + str(order_id);
  print "受注日:" + order_date.strftime('%Y/%m/%d')
  print "顧客名:" + customer_name;
  print "製品名:" + product_name;

cursor.close();
print("psycopg2 cursor closed.");
conn.close();
print("psycopg2 connection closed.");

上記コードを実行してみた結果は以下の通りとなります。(※ちなみに結果データについてはTableau社で良く用いられている『SuperStore』のデータを使っています)

$ ./python-to-redshift_select.py 
psycopg2 connected.
psycopg2 cursor opened.
(8369L,)
----
オーダーID:47873
受注日:2009/01/01
顧客名:吉津 正晃
製品名:Eldon Wave Desk Accessories
----
オーダーID:55554
受注日:2009/01/01
顧客名:中谷 成広
製品名:Xerox 1939
----
オーダーID:4871
受注日:2009/01/02
顧客名:久保田 智和
製品名:Global Leather Task Chair Black
----
オーダーID:24519
受注日:2009/01/02
顧客名:神保 雄二
製品名:Pressboard Data Binder Crimson 12 X 8 1/2
----
オーダーID:29187
受注日:2009/01/02
顧客名:河本 訓治
製品名:Xerox 217
psycopg2 cursor closed.
psycopg2 connection closed.

 

COPY

S3からのデータの投入(COPY)のサンプルコード。この辺りは上記処理と枠組みは同じです。TRUNCATEもこの中で出来ています。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import psycopg2

hostName = "XXXXXXXXXXXXXXXXXXXX"
databaseName = "XXXXXXXXX"
portNo = "5439"
userName = "XXXXXXXX"
password = "XXXXXXXXXXXXX"

conn = psycopg2.connect(
  host=hostName,
  database=databaseName,
  port=portNo,
  user=userName,
  password=password
);
print("psycopg2 connected.");


cursor = conn.cursor();
print("psycopg2 cursor opened.");

cursor.execute("TRUNCATE xxxx.xxxxxxxxxx");

copy_statement = """
COPY xxxx.xxxxxxxxxx FROM 's3://xxxxxxxxxxxx/xxxxxxx/xxxxxxxxx.csv'
CREDENTIALS 'aws_access_key_id=XXXXXXXXXX;aws_secret_access_key=YYYYYYYYYY'
IGNOREHEADER 1
DATEFORMAT 'YYYY/MM/DD'
TIMEFORMAT 'auto'
""";

print "COPY start.";
cursor.execute(copy_statement);
print "COPY end.";

conn.commit();

cursor.close();
print("psycopg2 cursor closed.");
conn.close();
print("psycopg2 connection closed.");

 

VACUUM

VACUUM処理については上記で実施しているような形(トランザクション管理内)での実行が行えない為、21行目のset_isolation_levelメソッドでISOLATION_LEVELの設定変更を行ってからVACUUM文の実施を行っています。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import psycopg2

hostName = "XXXXXXXXXXXXXXXXXXXX"
databaseName = "XXXXXXXXX"
portNo = "5439"
userName = "XXXXXXXX"
password = "XXXXXXXXXXXXX"

  conn = psycopg2.connect(
  host=hostName,
  database=databaseName,
  port=portNo,
  user=userName,
  password=password
);
print("psycopg2 connected.");

conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = conn.cursor();
print "VACUUM start.";
cursor.execute("VACUUM public.orders");
print "VACUUM end.";
cursor.close();
conn.close();

 

SQLファイルの読み込み

任意のSQLファイルの読み込み&実行も以下の形で行なう事が可能です。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import psycopg2

hostName = "XXXXXXXXXXXXXXXXXXXX"
databaseName = "XXXXXXXXX"
portNo = "5439"
userName = "XXXXXXXX"
password = "XXXXXXXXXXXXX"

  conn = psycopg2.connect(
  host=hostName,
  database=databaseName,
  port=portNo,
  user=userName,
  password=password
);
print("psycopg2 connected.");

cursor = conn.cursor();
print("psycopg2 cursor opened.");

cursor.execute(open("/xxxx/xxxx/xxxx/select.sql","r").read());
print cursor.fetchone();

cursor.close();
conn.close();

以下の形でファイルを用意しておくと、

SELECT VERSION();

実行時には以下の様な形で出力が得られます。

psycopg2 connected.
psycopg2 cursor opened.
('PostgreSQL 8.0.2 on i686-pc-linux-gnu, compiled by GCC gcc (GCC) 3.4.2 20041017 (Red Hat 3.4.2-6.fc3), Redshift 1.0.1012',)

 

まとめ

以上、psycopg2を使ってのAmazon Redshiftへのアクセス方法に関するまとめでした。データの可視化を実現させるためには、Amazon Redshift等のDBにデータを取込む為のファイルベースのETL処理、及び取り込んでからのSQLによるELT処理等でプログラミング処理を必要とする局面は多いかと思います。Pythonは比較的手軽に扱えるスクリプト言語ですので今回ご紹介したようなDBアクセス、そしてファイル加工等の処理を実現させるために活用してみるのも検討してみてはいかがでしょうか。こちらからは以上です。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.